package gecko.lang;

import gecko.x.ClassX;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Reactor 实现这样的队列：
 * 1. 顺序分组多个FIFO队列；
 * 2. 每个队列容量充满时，阻塞元素新增操作，以控制这个分组的处理速度；
 *
 * @author Yoojia Chen (yoojiachen@gmail.com)
 * @since 1.0.0
 */
public class Reactor<T> {

    private static final Logger LOGGER = LoggerFactory.getLogger(Reactor.class);

    private final ExecutorService mWorkers = Executors.newCachedThreadPool();
    private final List<BlockingQueue<T>> mQueueGroup = new ArrayList<>();
    private final List<Handler<T>> mEventHandlers = new ArrayList<>();
    private final int mQueueCapacity;
    private ExecutorService mSelectorThreads;

    public Reactor(int queueCapacity) {
        mQueueCapacity = queueCapacity;
    }

    public void addLast(Handler<T> handler) {
        mQueueGroup.add(new LinkedBlockingQueue<>(mQueueCapacity));
        mEventHandlers.add(handler);
    }

    public void emmitNext(int id, T ele) throws InterruptedException {
        emmitAt(id + 1, ele);
    }

    public void emmitEnd(T ele) throws InterruptedException {
        emmitAt(mQueueGroup.size() - 1, ele);
    }

    public void emmitStart(T ele) throws InterruptedException {
        emmitAt(0, ele);
    }

    private void emmitAt(int id, T ele) throws InterruptedException {
        mQueueGroup.get(id).put(ele);
    }

    public void start() {
        final int size = mQueueGroup.size();
        mSelectorThreads = Executors.newFixedThreadPool(size);
        for (int i = 0; i < size; i++) {
            final int index = i;
            mSelectorThreads.submit(() -> {
                final Handler<T> handler = mEventHandlers.get(index);
                final BlockingQueue<T> queue = mQueueGroup.get(index);
                while (!Thread.currentThread().isInterrupted()) {
                    final T e;
                    try {
                        e = queue.take();
                    } catch (InterruptedException e1) {
                        return;
                    }
                    mWorkers.submit(() -> {
                        try {
                            handler.handle(index, e);
                        } catch (Exception ex) {
                            LOGGER.error("Handler抛出无法处理异常:" + ClassX.nameOf(handler), ex);
                        }
                    });
                }
            });
        }
    }

    public void shutdown() {
        mWorkers.shutdown();
        mSelectorThreads.shutdown();
    }

    public interface Handler<T> {
        void handle(int index, T ele) throws Exception;
    }
}
